/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "aodatapage.h"

/*
data_page :
|data_page_header|data_page_user_header|data_page_body|

data_page_body :
|data_unit 1|data_unit 2|data_unit 3|...|

data_unit :
|data_unit_header|data_unit_user_header|data_unit_user_body|post_data_unit_user_reserved|
*/

#define AODATAPAGE_HEADER_VERSION		1

struct AppendonlyDataPageHeader
{
	char			magic[4] ;
	uint16_t		datapage_header_version ;
	
	uint64_t		datapage_size ;
	uint16_t		datapage_user_header_size ;
	uint16_t		dataunit_user_header_size ;
	uint16_t		post_dataunit_reserved_len ;
	
	char			*next_dataunit ;
} ;

struct AppendonlyDataUnitHeader
{
	uint64_t		dataunit_user_len ;
} ;

TLS int	_tls_datapage_last_error = 0 ;

int GetAppendonlyDataPageLastError()
{
	return _tls_datapage_last_error;
}

char *CreateAppendonlyDataPage( char magic[4] , uint64_t datapage_size , uint16_t datapage_user_header_size , uint16_t dataunit_user_header_size , uint16_t post_dataunit_reserved_len )
{
	char			*datapage = NULL ;
	struct AppendonlyDataPageHeader	*datapage_header = NULL ;
	
	datapage = (char*)malloc( (size_t)datapage_size ) ;
	if( datapage == NULL )
	{
		_tls_datapage_last_error = AODATAPAGE_ERROR_ALLOC ;
		return NULL;
	}
	memset( datapage , 0x00 , (size_t)datapage_size );
	
	datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	datapage_header->magic[0] = magic[0] ;
	datapage_header->magic[1] = magic[1] ;
	datapage_header->magic[2] = magic[2] ;
	datapage_header->magic[3] = magic[3] ;
	datapage_header->datapage_header_version = AODATAPAGE_HEADER_VERSION ;
	datapage_header->datapage_size = datapage_size ;
	datapage_header->datapage_user_header_size = datapage_user_header_size ;
	datapage_header->dataunit_user_header_size = dataunit_user_header_size ;
	datapage_header->post_dataunit_reserved_len = post_dataunit_reserved_len ;
	datapage_header->next_dataunit = datapage + sizeof(struct AppendonlyDataPageHeader) + datapage_user_header_size ;
	
	return datapage;
}

void DestroyAppendonlyDataPage( char *datapage )
{
	free( datapage );
	
	return;
}

void *GetAppendonlyDataPageUserHeader( char *datapage )
{
	struct AppendonlyDataPageHeader	*datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	
	if( datapage_header->datapage_header_version != AODATAPAGE_HEADER_VERSION )
	{
		_tls_datapage_last_error = AODATAPAGE_ERROR_VERSION_NOT_MATCH ;
		return NULL;
	}
	
	return datapage+sizeof(struct AppendonlyDataPageHeader);
}

char *AddAppendonlyDataUnit( char *datapage , void *dataunit_user_header , void *dataunit_user_body , uint64_t dataunit_user_body_len )
{
	struct AppendonlyDataPageHeader	*datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	uint64_t			dataunit_user_len ;
	struct AppendonlyDataUnitHeader	*dataunit_header = NULL ;
	char				*curr_dataunit = NULL ;
	
	if( datapage_header->datapage_header_version != AODATAPAGE_HEADER_VERSION )
	{
		_tls_datapage_last_error = AODATAPAGE_ERROR_VERSION_NOT_MATCH ;
		return NULL;
	}

	curr_dataunit = datapage_header->next_dataunit ;
	
	dataunit_header = (struct AppendonlyDataUnitHeader *)(datapage_header->next_dataunit) ;
	dataunit_user_len = datapage_header->dataunit_user_header_size + dataunit_user_body_len + datapage_header->post_dataunit_reserved_len ;
	if( (uint64_t)(datapage-datapage_header->next_dataunit) + datapage_header->datapage_size <= (uint64_t)(sizeof(struct AppendonlyDataUnitHeader)) + dataunit_user_len )
	{
		_tls_datapage_last_error = AODATAPAGE_WARN_SPACE_ISNOT_ENOUGH ;
		return NULL;
	}
	
	dataunit_header->dataunit_user_len = dataunit_user_len ;
	datapage_header->next_dataunit += sizeof(struct AppendonlyDataUnitHeader) ;
	if( datapage_header->dataunit_user_header_size > 0 )
	{
		memcpy( datapage_header->next_dataunit , dataunit_user_header , datapage_header->dataunit_user_header_size ); datapage_header->next_dataunit += datapage_header->dataunit_user_header_size ;
	}
	if( dataunit_user_body_len > 0 )
	{
		memcpy( datapage_header->next_dataunit , dataunit_user_body , (size_t)dataunit_user_body_len ); datapage_header->next_dataunit += dataunit_user_body_len ;
	}
	if( datapage_header->post_dataunit_reserved_len == 1 )
	{
		*(datapage_header->next_dataunit) = '\0' ; datapage_header->next_dataunit++;
	}
	else if( datapage_header->post_dataunit_reserved_len > 1 )
	{
		memset( datapage_header->next_dataunit , 0x00 , datapage_header->post_dataunit_reserved_len ); datapage_header->next_dataunit += datapage_header->post_dataunit_reserved_len ;
	}
	
	return curr_dataunit;
}

char *PeekAppendonlyDataUnit( char *datapage , char *dataunit , void **dataunit_user_header , char **dataunit_user_body , uint64_t *dataunit_user_body_len )
{
	struct AppendonlyDataPageHeader	*datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	char				*p = NULL ;
	
	if( datapage_header->datapage_header_version != AODATAPAGE_HEADER_VERSION )
	{
		_tls_datapage_last_error = AODATAPAGE_ERROR_VERSION_NOT_MATCH ;
		return NULL;
	}

	if( dataunit == NULL )
	{
		dataunit = datapage + sizeof(struct AppendonlyDataPageHeader) + datapage_header->datapage_user_header_size ;
	}
	else if( dataunit == datapage_header->next_dataunit )
	{
		_tls_datapage_last_error = AODATAPAGE_INFO_TRAVEL_OUT ;
		return NULL;
	}
	
	if( dataunit_user_header )
		(*dataunit_user_header) = GetAppendonlyDataUnitUserHeader(dataunit) ;
	p = GetAppendonlyDataUnitUserBody( datapage , dataunit , dataunit_user_body_len ) ;
	if( dataunit_user_body )
		(*dataunit_user_body) = p ;
	
	return dataunit;
}

char *NextAppendonlyDataUnit( char *datapage , char *dataunit )
{
	struct AppendonlyDataPageHeader	*datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	struct AppendonlyDataUnitHeader	*dataunit_header = NULL ;
	
	if( dataunit == NULL )
	{
		dataunit = datapage + sizeof(struct AppendonlyDataPageHeader) + datapage_header->datapage_user_header_size ;
	}
	else if( dataunit == datapage_header->next_dataunit )
	{
		_tls_datapage_last_error = AODATAPAGE_INFO_TRAVEL_OUT ;
		return NULL;
	}
	else
	{
		dataunit_header = (struct AppendonlyDataUnitHeader *)dataunit ;
		dataunit += sizeof(struct AppendonlyDataUnitHeader) + dataunit_header->dataunit_user_len ;
	}
	
	return dataunit;
}

char *TravelAppendonlyDataUnit( char *datapage , char *dataunit , void **dataunit_user_header , char **dataunit_user_body , uint64_t *dataunit_user_body_len )
{
	dataunit = NextAppendonlyDataUnit( datapage , dataunit ) ;
	if( dataunit == NULL )
		return NULL;
	
	dataunit = PeekAppendonlyDataUnit( datapage , dataunit , dataunit_user_header , dataunit_user_body , dataunit_user_body_len ) ;
	if( dataunit == NULL )
		return NULL;
	
	return dataunit;
}

void *GetAppendonlyDataUnitUserHeader( char *dataunit )
{
	return dataunit+sizeof(struct AppendonlyDataUnitHeader);
}

char *GetAppendonlyDataUnitUserBody( char *datapage , char *dataunit , uint64_t *dataunit_user_body_len )
{
	struct AppendonlyDataPageHeader	*datapage_header = (struct AppendonlyDataPageHeader *)datapage ;
	struct AppendonlyDataUnitHeader	*dataunit_header = (struct AppendonlyDataUnitHeader *)dataunit ;
	
	if( dataunit_user_body_len )
		(*dataunit_user_body_len) = dataunit_header->dataunit_user_len ;
	return dataunit+sizeof(struct AppendonlyDataUnitHeader)+datapage_header->dataunit_user_header_size;
}

